{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<center>\n",
    "    <p style=\"text-align:center\">\n",
    "        <img alt=\"phoenix logo\" src=\"https://raw.githubusercontent.com/Arize-ai/phoenix-assets/9e6101d95936f4bd4d390efc9ce646dc6937fb2d/images/socal/github-large-banner-phoenix.jpg\" width=\"1000\"/>\n",
    "        <br>\n",
    "        <br>\n",
    "        <a href=\"https://arize.com/docs/phoenix/\">Docs</a>\n",
    "        |\n",
    "        <a href=\"https://github.com/Arize-ai/phoenix\">GitHub</a>\n",
    "        |\n",
    "        <a href=\"https://arize-ai.slack.com/join/shared_invite/zt-2w57bhem8-hq24MB6u7yE_ZF_ilOYSBw#/shared-invite/email\">Community</a>\n",
    "    </p>\n",
    "</center>\n",
    "<h1 align=\"center\">Tracing CrewAI with Arize Phoenix - Routing Workflow</h1>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install -q arize-phoenix opentelemetry-sdk opentelemetry-exporter-otlp crewai crewai_tools openinference-instrumentation-crewai"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "5-gPdVmIndw9"
   },
   "source": [
    "## Set up Keys and Dependencies"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note: For this colab you'll need:\n",
    "\n",
    "*   OpenAI API key (https://openai.com/)\n",
    "*   Serper API key (https://serper.dev/)\n",
    "*   Phoenix API key (https://app.phoenix.arize.com/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from getpass import getpass\n",
    "\n",
    "# Prompt the user for their API keys if they haven't been set\n",
    "openai_key = os.getenv(\"OPENAI_API_KEY\", \"OPENAI_API_KEY\")\n",
    "serper_key = os.getenv(\"SERPER_API_KEY\", \"SERPER_API_KEY\")\n",
    "\n",
    "if openai_key == \"OPENAI_API_KEY\":\n",
    "    openai_key = getpass(\"Please enter your OPENAI_API_KEY: \")\n",
    "\n",
    "if serper_key == \"SERPER_API_KEY\":\n",
    "    serper_key = getpass(\"Please enter your SERPER_API_KEY: \")\n",
    "\n",
    "# Set the environment variables with the provided keys\n",
    "os.environ[\"OPENAI_API_KEY\"] = openai_key\n",
    "os.environ[\"SERPER_API_KEY\"] = serper_key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if \"PHOENIX_API_KEY\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_API_KEY\"] = getpass(\"🔑 Enter your Phoenix API key: \")\n",
    "\n",
    "if \"PHOENIX_COLLECTOR_ENDPOINT\" not in os.environ:\n",
    "    os.environ[\"PHOENIX_COLLECTOR_ENDPOINT\"] = getpass(\"🔑 Enter your Phoenix Collector Endpoint\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "r9X87mdGnpbc"
   },
   "source": [
    "## Configure Tracing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from phoenix.otel import register\n",
    "\n",
    "tracer_provider = register(project_name=\"crewai-agents\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "vYT-EU56ni94"
   },
   "source": [
    "# Instrument CrewAI"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from openinference.instrumentation.crewai import CrewAIInstrumentor\n",
    "\n",
    "CrewAIInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define your Working Agents"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import nest_asyncio\n",
    "from crewai import Agent, Crew, Process, Task\n",
    "from crewai.flow import Flow, listen, router, start\n",
    "from pydantic import BaseModel\n",
    "\n",
    "research_analyst = Agent(\n",
    "    role=\"Senior Research Analyst\",\n",
    "    goal=\"Gather and summarize data on the requested topic.\",\n",
    "    backstory=\"Expert in tech market trends.\",\n",
    "    allow_delegation=False,\n",
    ")\n",
    "\n",
    "content_strategist = Agent(\n",
    "    role=\"Tech Content Strategist\",\n",
    "    goal=\"Craft an article outline based on provided research.\",\n",
    "    backstory=\"Storyteller who turns data into narratives.\",\n",
    "    allow_delegation=False,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "From here, there are two ways to do this -- through a routing Agent or through ```@router()``` decorator in Flows (allows you to define conditional routing logic based on the output of a method)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Option 1: Define your logic for Router Agent to classify the query & run corresponding Agent"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "routerAgent = Agent(\n",
    "    role=\"Router\",\n",
    "    goal=\"Classify each query as either 'research' or 'content outline'.\",\n",
    "    backstory=\"Triage bot for content workflows.\",\n",
    "    verbose=False,\n",
    ")\n",
    "\n",
    "\n",
    "def route(user_input: str, router):\n",
    "    router_task = Task(\n",
    "        description=user_input, agent=router, expected_output=\"One word: 'research' or 'content'\"\n",
    "    )\n",
    "    router_classify = Crew(\n",
    "        agents=[router], tasks=[router_task], process=Process.sequential, verbose=False\n",
    "    )\n",
    "    router_results = router_classify.kickoff()\n",
    "    return router_results\n",
    "\n",
    "\n",
    "def type_of_task(router_results):\n",
    "    if isinstance(router_results, list):\n",
    "        result = router_results[0]\n",
    "        result_text = result.text if hasattr(result, \"text\") else str(result)\n",
    "    else:\n",
    "        result_text = (\n",
    "            router_results.text if hasattr(router_results, \"text\") else str(router_results)\n",
    "        )\n",
    "    task_type = result_text.strip().lower()\n",
    "\n",
    "    return task_type\n",
    "\n",
    "\n",
    "def working_agent(task_type, user_input: str):\n",
    "    if \"research\" in task_type:\n",
    "        agent = research_analyst\n",
    "        label = \"Research Analyst\"\n",
    "    else:\n",
    "        agent = content_strategist\n",
    "        label = \"Content Strategist\"\n",
    "\n",
    "    work_task = Task(description=user_input, agent=agent, expected_output=\"Agent response\")\n",
    "    worker_crew = Crew(agents=[agent], tasks=[work_task], process=Process.sequential, verbose=True)\n",
    "    work_results = worker_crew.kickoff()\n",
    "    if isinstance(work_results, list):\n",
    "        output = work_results[0].text if hasattr(work_results[0], \"text\") else str(work_results[0])\n",
    "    else:\n",
    "        output = work_results.text if hasattr(work_results, \"text\") else str(work_results)\n",
    "\n",
    "    print(f\"\\n=== Routed to {label} ({task_type}) ===\\n{output}\\n\")\n",
    "    return output"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Examples Runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# ─── 4) Example Runs ─────────────────────────────────────────────────────────\n",
    "for query in [\n",
    "    \"Please research the latest AI safety papers.\",\n",
    "    \"Outline an article on AI safety trends.\",\n",
    "]:\n",
    "    router_output = route(query, routerAgent)\n",
    "    task_output = type_of_task(router_output)\n",
    "    working_agent(task_output, query)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Option 2: Define your logic for ```@router()``` Decorator to define routing logic"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nest_asyncio.apply()\n",
    "\n",
    "\n",
    "# Define Flow State\n",
    "class RoutingState(BaseModel):\n",
    "    query: str = \"\"\n",
    "    route: str = \"\"\n",
    "\n",
    "\n",
    "# Define Structured Flow\n",
    "class RoutingFlow(Flow[RoutingState]):\n",
    "    def __init__(self, query: str):\n",
    "        super().__init__(state=RoutingState(query=query))\n",
    "\n",
    "    @start()\n",
    "    def handle_query(self):\n",
    "        print(f\"📥 Incoming Query: {self.state.query}\")\n",
    "\n",
    "    @router(handle_query)\n",
    "    def decide_route(self):\n",
    "        if \"research\" in self.state.query.lower():\n",
    "            self.state.route = \"research\"\n",
    "            return \"research\"\n",
    "        else:\n",
    "            self.state.route = \"outline\"\n",
    "            return \"outline\"\n",
    "\n",
    "    @listen(\"research\")\n",
    "    def run_research(self):\n",
    "        task = Task(\n",
    "            description=self.state.query,\n",
    "            expected_output=\"Summary of findings on AI safety\",\n",
    "            agent=research_analyst,\n",
    "        )\n",
    "        crew = Crew(\n",
    "            agents=[research_analyst], tasks=[task], process=Process.sequential, verbose=True\n",
    "        )\n",
    "        crew.kickoff()\n",
    "\n",
    "    @listen(\"outline\")\n",
    "    def run_content_strategy(self):\n",
    "        task = Task(\n",
    "            description=self.state.query,\n",
    "            expected_output=\"An article outline about the given topic\",\n",
    "            agent=content_strategist,\n",
    "        )\n",
    "        crew = Crew(\n",
    "            agents=[content_strategist], tasks=[task], process=Process.sequential, verbose=True\n",
    "        )\n",
    "        crew.kickoff()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Examples Runs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queries = [\n",
    "    \"Please research the latest AI safety papers.\",\n",
    "    \"Outline an article on AI safety trends.\",\n",
    "]\n",
    "\n",
    "for query in queries:\n",
    "    flow = RoutingFlow(query=query)\n",
    "    flow.kickoff()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "fH0uVMgxpLql"
   },
   "source": [
    "### Check your Phoenix project to view the traces and spans from your runs."
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
